DPDK系列之五DPDK的并发模型 |
您所在的位置:网站首页 › map plugins是什么意思 › DPDK系列之五DPDK的并发模型 |
一、DPDK的并发 前面提到了网络爆发式的发展,提出了C10K到c100K甚至更多并发的要求,其实本质上还是对数据接收和处理的速度,即从软件、硬件实现整体的最大性能的平衡。这时候在软件上多线程、多进程以至于并行编程就拿到台面上,毕竟这是解决接收和处理数据的一个最有力的方法和手段。 在前面也提到过,DPDK的优势在于缩短了数据流的路径,但另外一个重要的方面就是多核(CPU)的利用以及在指令处理方面使用了SIMD。正如前面所分析,合理的利用核心的数量和并处理好缓存的命中,就可以大幅的提高处理的效率,再加上类似于批处理的数量优势,自然就会有大幅度的效能的提升。 说得更通俗一些,就是要挖掘多核和多CPU以及硬件的资源,但这些资源的挖掘尽量要从软件上来实现。这就是DPDK并发的意义。 二、多线程模型及源码分析DPDK中的线程创建是基于Posix库创建的,从这方面来看,就没有脱离开上层应用编程的范围。说这个的意思就是说,这块本质上是没有难度的。看一下相关的线程处理代码: /* Launch threads, called at application init(). */ int rte_eal_init(int argc, char **argv) { int i, fctret, ret; pthread_t thread_id; static rte_atomic32_t run_once = RTE_ATOMIC32_INIT(0); const char *p; static char logid[PATH_MAX]; char cpuset[RTE_CPU_AFFINITY_STR_LEN]; char thread_name[RTE_MAX_THREAD_NAME_LEN]; bool phys_addrs; /* checks if the machine is adequate * / if (!rte_cpu_is_supported()) { rte_eal_init_alert("unsupported cpu type."); rte_errno = ENOTSUP; return -1; } if (!rte_atomic32_test_and_set(&run_once)) { rte_eal_init_alert("already called initialization."); rte_errno = EALREADY; return -1; } p = strrchr(argv[0], '/'); strlcpy(logid, p ? p + 1 : argv[0], sizeof(logid)); thread_id = pthread_self(); eal_reset_internal_config(&internal_config); /* set log level as early as possible * / eal_log_level_parse(argc, argv); if (rte_eal_cpu_init() < 0) { rte_eal_init_alert("Cannot detect lcores."); rte_errno = ENOTSUP; return -1; } fctret = eal_parse_args(argc, argv); if (fctret < 0) { rte_eal_init_alert("Invalid 'command line' arguments."); rte_errno = EINVAL; rte_atomic32_clear(&run_once); return -1; } if (eal_plugins_init() < 0) { rte_eal_init_alert("Cannot init plugins"); rte_errno = EINVAL; rte_atomic32_clear(&run_once); return -1; } if (eal_option_device_parse()) { rte_errno = ENODEV; rte_atomic32_clear(&run_once); return -1; } if (rte_config_init() < 0) { rte_eal_init_alert("Cannot init config"); return -1; } if (rte_eal_intr_init() < 0) { rte_eal_init_alert("Cannot init interrupt-handling thread"); return -1; } if (rte_eal_alarm_init() < 0) { rte_eal_init_alert("Cannot init alarm"); /* rte_eal_alarm_init sets rte_errno on failure. */ return -1; } /* Put mp channel init before bus scan so that we can init the vdev * bus through mp channel in the secondary process before the bus scan. * / if (rte_mp_channel_init() < 0 && rte_errno != ENOTSUP) { rte_eal_init_alert("failed to init mp channel"); if (rte_eal_process_type() == RTE_PROC_PRIMARY) { rte_errno = EFAULT; return -1; } } /* register multi-process action callbacks for hotplug * / if (eal_mp_dev_hotplug_init() < 0) { rte_eal_init_alert("failed to register mp callback for hotplug"); return -1; } if (rte_bus_scan()) { rte_eal_init_alert("Cannot scan the buses for devices"); rte_errno = ENODEV; rte_atomic32_clear(&run_once); return -1; } phys_addrs = rte_eal_using_phys_addrs() != 0; /* if no EAL option "--iova-mode=", use bus IOVA scheme */ if (internal_config.iova_mode == RTE_IOVA_DC) { /* autodetect the IOVA mapping mode */ enum rte_iova_mode iova_mode = rte_bus_get_iommu_class(); if (iova_mode == RTE_IOVA_DC) { RTE_LOG(DEBUG, EAL, "Buses did not request a specific IOVA mode.\n"); if (!phys_addrs) { /* if we have no access to physical addresses, * pick IOVA as VA mode. * / iova_mode = RTE_IOVA_VA; RTE_LOG(DEBUG, EAL, "Physical addresses are unavailable, selecting IOVA as VA mode.\n"); #if defined(RTE_LIBRTE_KNI) && LINUX_VERSION_CODE >= KERNEL_VERSION(4, 10, 0) } else if (rte_eal_check_module("rte_kni") == 1) { iova_mode = RTE_IOVA_PA; RTE_LOG(DEBUG, EAL, "KNI is loaded, selecting IOVA as PA mode for better KNI performance.\n"); #endif } else if (is_iommu_enabled()) { /* we have an IOMMU, pick IOVA as VA mode */ iova_mode = RTE_IOVA_VA; RTE_LOG(DEBUG, EAL, "IOMMU is available, selecting IOVA as VA mode.\n"); } else { /* physical addresses available, and no IOMMU * found, so pick IOVA as PA. */ iova_mode = RTE_IOVA_PA; RTE_LOG(DEBUG, EAL, "IOMMU is not available, selecting IOVA as PA mode.\n"); } } #if defined(RTE_LIBRTE_KNI) && LINUX_VERSION_CODE < KERNEL_VERSION(4, 10, 0) /* Workaround for KNI which requires physical address to work * in kernels < 4.10 * / if (iova_mode == RTE_IOVA_VA && rte_eal_check_module("rte_kni") == 1) { if (phys_addrs) { iova_mode = RTE_IOVA_PA; RTE_LOG(WARNING, EAL, "Forcing IOVA as 'PA' because KNI module is loaded\n"); } else { RTE_LOG(DEBUG, EAL, "KNI can not work since physical addresses are unavailable\n"); } } #endif rte_eal_get_configuration()->iova_mode = iova_mode; } else { rte_eal_get_configuration()->iova_mode = internal_config.iova_mode; } if (rte_eal_iova_mode() == RTE_IOVA_PA && !phys_addrs) { rte_eal_init_alert("Cannot use IOVA as 'PA' since physical addresses are not available"); rte_errno = EINVAL; return -1; } RTE_LOG(INFO, EAL, "Selected IOVA mode '%s'\n", rte_eal_iova_mode() == RTE_IOVA_PA ? "PA" : "VA"); if (internal_config.no_hugetlbfs == 0) { /* rte_config isn't initialized yet * / ret = internal_config.process_type == RTE_PROC_PRIMARY ? eal_hugepage_info_init() : eal_hugepage_info_read(); if (ret < 0) { rte_eal_init_alert("Cannot get hugepage information."); rte_errno = EACCES; rte_atomic32_clear(&run_once); return -1; } } if (internal_config.memory == 0 && internal_config.force_sockets == 0) { if (internal_config.no_hugetlbfs) internal_config.memory = MEMSIZE_IF_NO_HUGE_PAGE; } if (internal_config.vmware_tsc_map == 1) { #ifdef RTE_LIBRTE_EAL_VMWARE_TSC_MAP_SUPPORT rte_cycles_vmware_tsc_map = 1; RTE_LOG (DEBUG, EAL, "Using VMWARE TSC MAP, " "you must have monitor_control.pseudo_perfctr = TRUE\n"); #else RTE_LOG (WARNING, EAL, "Ignoring --vmware-tsc-map because " "RTE_LIBRTE_EAL_VMWARE_TSC_MAP_SUPPORT is not set\n"); #endif } if (rte_eal_log_init(logid, internal_config.syslog_facility) < 0) { rte_eal_init_alert("Cannot init logging."); rte_errno = ENOMEM; rte_atomic32_clear(&run_once); return -1; } #ifdef VFIO_PRESENT if (rte_eal_vfio_setup() < 0) { rte_eal_init_alert("Cannot init VFIO"); rte_errno = EAGAIN; rte_atomic32_clear(&run_once); return -1; } #endif /* in secondary processes, memory init may allocate additional fbarrays * not present in primary processes, so to avoid any potential issues, * initialize memzones first. */ if (rte_eal_memzone_init() < 0) { rte_eal_init_alert("Cannot init memzone"); rte_errno = ENODEV; return -1; } if (rte_eal_memory_init() < 0) { rte_eal_init_alert("Cannot init memory"); rte_errno = ENOMEM; return -1; } /* the directories are locked during eal_hugepage_info_init */ eal_hugedirs_unlock(); if (rte_eal_malloc_heap_init() < 0) { rte_eal_init_alert("Cannot init malloc heap"); rte_errno = ENODEV; return -1; } if (rte_eal_tailqs_init() < 0) { rte_eal_init_alert("Cannot init tail queues for objects"); rte_errno = EFAULT; return -1; } if (rte_eal_timer_init() < 0) { rte_eal_init_alert("Cannot init HPET or TSC timers"); rte_errno = ENOTSUP; return -1; } eal_check_mem_on_local_socket(); eal_thread_init_master(rte_config.master_lcore); ret = eal_thread_dump_affinity(cpuset, sizeof(cpuset)); RTE_LOG(DEBUG, EAL, "Master lcore %u is ready (tid=%zx;cpuset=[%s%s])\n", rte_config.master_lcore, (uintptr_t)thread_id, cpuset, ret == 0 ? "" : "..."); RTE_LCORE_FOREACH_SLAVE(i) { /* * create communication pipes between master thread * and children * / if (pipe(lcore_config[i].pipe_master2slave) < 0) rte_panic("Cannot create pipe\n"); if (pipe(lcore_config[i].pipe_slave2master) < 0) rte_panic("Cannot create pipe\n"); lcore_config[i].state = WAIT; /* create a thread for each lcore * / ret = pthread_create(&lcore_config[i].thread_id, NULL, eal_thread_loop, NULL); if (ret != 0) rte_panic("Cannot create thread\n"); /* Set thread_name for aid in debugging. * / snprintf(thread_name, sizeof(thread_name), "lcore-slave-%d", i); ret = rte_thread_setname(lcore_config[i].thread_id, thread_name); if (ret != 0) RTE_LOG(DEBUG, EAL, "Cannot set name for lcore thread\n"); } /* * Launch a dummy function on all slave lcores, so that master lcore * knows they are all ready when this function returns. */ rte_eal_mp_remote_launch(sync_func, NULL, SKIP_MASTER); rte_eal_mp_wait_lcore(); /* initialize services so vdevs register service during bus_probe. */ ret = rte_service_init(); if (ret) { rte_eal_init_alert("rte_service_init() failed"); rte_errno = ENOEXEC; return -1; } /* Probe all the buses and devices/drivers on them */ if (rte_bus_probe()) { rte_eal_init_alert("Cannot probe devices"); rte_errno = ENOTSUP; return -1; } #ifdef VFIO_PRESENT /* Register mp action after probe() so that we got enough info */ if (rte_vfio_is_enabled("vfio") && vfio_mp_sync_setup() < 0) return -1; #endif /* initialize default service/lcore mappings and start running. Ignore * -ENOTSUP, as it indicates no service coremask passed to EAL. */ ret = rte_service_start_with_defaults(); if (ret < 0 && ret != -ENOTSUP) { rte_errno = ENOEXEC; return -1; } /* * Clean up unused files in runtime directory. We do this at the end of * init and not at the beginning because we want to clean stuff up * whether we are primary or secondary process, but we cannot remove * primary process' files because secondary should be able to run even * if primary process is dead. * * In no_shconf mode, no runtime directory is created in the first * place, so no cleanup needed. * / if (!internal_config.no_shconf && eal_clean_runtime_dir() < 0) { rte_eal_init_alert("Cannot clear runtime directory"); return -1; } eal_mcfg_complete(); /* Call each registered callback, if enabled * / rte_option_init(); return fctret; }这个函数在前面的文章中的例程中用过。看看开头的几个变量和rte_eal_cpu_init函数,不用看内部的实现,都应该猜到是什么了。在rte_eal_cpu_init处理好CPU,开始调用eal_parse_args来处理参数进行Master核的确定和CPU的管理(确定哪核可用)。 到这里,基本就应该明白要怎么使用线程了,再加上开头的pthread_t thread_id;应该立刻明白了吧。并行计算中处理的第一个要务就是对CPU的分配和管理,包括一些CPU控制在内。 接着往下看就是一系列的初始化如前面文章提到的,配置、内存、内存池、队列、定时器等等。最后到主核心线程和各个子线程核心的启动: eal_thread_init_master(rte_config.master_lcore); ret = eal_thread_dump_affinity(cpuset, sizeof(cpuset)); RTE_LOG(DEBUG, EAL, "Master lcore %u is ready (tid=%zx;cpuset=[%s%s])\n", rte_config.master_lcore, (uintptr_t)thread_id, cpuset, ret == 0 ? "" : "..."); RTE_LCORE_FOREACH_SLAVE(i) { /* * create communication pipes between master thread * and children */ if (pipe(lcore_config[i].pipe_master2slave) < 0) rte_panic("Cannot create pipe\n"); if (pipe(lcore_config[i].pipe_slave2master) < 0) rte_panic("Cannot create pipe\n"); lcore_config[i].state = WAIT; /* create a thread for each lcore */ ret = pthread_create(&lcore_config[i].thread_id, NULL, eal_thread_loop, NULL); if (ret != 0) rte_panic("Cannot create thread\n"); /* Set thread_name for aid in debugging. * / snprintf(thread_name, sizeof(thread_name), "lcore-slave-%d", i); ret = rte_thread_setname(lcore_config[i].thread_id, thread_name); if (ret != 0) RTE_LOG(DEBUG, EAL, "Cannot set name for lcore thread\n"); }eal_thread_init_master和eal_thread_loop它们都会调用: /* set affinity for current EAL thread */ static int eal_thread_set_affinity(void) { unsigned lcore_id = rte_lcore_id(); /* acquire system unique id */ rte_gettid(); /* update EAL thread core affinity * / return rte_thread_set_affinity(&lcore_config[lcore_id].cpuset); }来进行CPU的绑定。然后不同模块为了使用线程,就需要进行注册,即创建后紧随的调用rte_eal_mp_remote_launch: /* * Launch a dummy function on all slave lcores, so that master lcore * knows they are all ready when this function returns. */ rte_eal_mp_remote_launch(sync_func, NULL, SKIP_MASTER); rte_eal_mp_wait_lcore();学习地址:Dpdk/网络协议栈/vpp/OvS/DDos/NFV/虚拟化/高性能专家(免费订阅,永久学习) 【文章福利】需要更多DPDK/SPDK学习资料加群793599096(资料包括C/C++,Linux,golang技术,内核,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,大厂面试题 等)可以自行添加学习交流群点击这里噢~ 初始的是一个空的系统函数。有兴趣可以搜一搜这个函数的使用,你会发现真得好多。它的定义: /* * Check that every SLAVE lcores are in WAIT state, then call * rte_eal_remote_launch() for all of them. If call_master is true * (set to CALL_MASTER), also call the function on the master lcore. */ int rte_eal_mp_remote_launch(int (*f)(void *), void *arg, enum rte_rmt_call_master_t call_master) { int lcore_id; int master = rte_get_master_lcore(); /* check state of lcores */ RTE_LCORE_FOREACH_SLAVE(lcore_id) { if (lcore_config[lcore_id].state != WAIT) return -EBUSY; } /* send messages to cores * / RTE_LCORE_FOREACH_SLAVE(lcore_id) { rte_eal_remote_launch(f, arg, lcore_id); } if (call_master == CALL_MASTER) { lcore_config[master].ret = f(arg); lcore_config[master].state = FINISHED; } return 0; } /* * Send a message to a slave lcore identified by slave_id to call a * function f with argument arg. Once the execution is done, the * remote lcore switch in FINISHED state. */ int rte_eal_remote_launch(int (*f)(void *), void *arg, unsigned slave_id) { int n; char c = 0; int m2s = lcore_config[slave_id].pipe_master2slave[1]; int s2m = lcore_config[slave_id].pipe_slave2master[0]; if (lcore_config[slave_id].state != WAIT) return -EBUSY; lcore_config[slave_id].f = f; lcore_config[slave_id].arg = arg; /* send message * / n = 0; while (n == 0 || (n < 0 && errno == EINTR)) n = write(m2s, &c, 1); if (n < 0) rte_panic("cannot write on configuration pipe\n"); /* wait ack * / do { n = read(s2m, &c, 1); } while (n < 0 && errno == EINTR); if (n |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |